package day05;

import beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

/**
 * Flink Table API 与 SQL —— 定义处理时间
 *
 * @author lvbingbing
 * @date 2022-01-19 00:22
 */
public class FlinkTableApi05 {
    public static void main(String[] args) throws Exception {
        // 1、获取可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism = 1;
        env.setParallelism(parallelism);
        // 2、定义处理时间
        defineProcessTime(env);
        // 3、触发程序执行
        env.execute();
    }

    /**
     * 定义处理时间
     *
     * @param env <br>
     */
    private static void defineProcessTime(StreamExecutionEnvironment env) {
        // 1、创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2、由 DataStream 转换成表时指定
        defineOnDataStream(tableEnv, env);
        // 3、定义 Table Schema 时指定
        defineOnTableSchema(tableEnv);
        // 4、在创建表的 DDL 中指定
        defineOnDdl(env);
    }

    /**
     * 由 DataStream 转换成表时指定处理时间
     *
     * @param tableEnv 表环境
     * @param env      可执行环境
     */
    private static void defineOnDataStream(StreamTableEnvironment tableEnv, StreamExecutionEnvironment env) {
        DataStream<SensorReading> dataStreamSource = env.readTextFile("input/sensor.txt").map(e -> {
            String[] split = e.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });
        Table table = tableEnv.fromDataStream(dataStreamSource, "id, temperature, timestamp, pt.proctime");
        table.printSchema();
    }

    /**
     * 定义 Table Schema 时指定处理时间
     *
     * @param tableEnv 表环境
     */
    private static void defineOnTableSchema(StreamTableEnvironment tableEnv) {
        tableEnv.connect(new FileSystem().path("input/sensor.txt"))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                        .field("pt", DataTypes.TIMESTAMP(3)).proctime()
                )
                .createTemporaryTable("inputTable");
        Table inputTable = tableEnv.from("inputTable");
        inputTable.printSchema();
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(inputTable, Row.class);
        rowDataStream.print("rowDataStream");
    }

    /**
     * 在创建表的 DDL 中指定处理时间
     *
     * @param env 可执行环境
     */
    private static void defineOnDdl(StreamExecutionEnvironment env) {
        // flink tableApi 不支持创建表的时候使用了内部函数 proctime()
        // 需要使用 blink tableApi
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
        String sql = "create table dataTable (" +
                "id varchar(20) not null, " +
                "ts bigint, " +
                "temperature double, " +
                "pt AS PROCTIME()" +
                ") with (" +
                "'connector.type' = 'filesystem', " +
                "'connector.path' = 'input/sensor.txt', " +
                "'format.type' = 'csv'" +
                ")";
        tableEnv.sqlUpdate(sql);
        Table dataTable = tableEnv.from("dataTable");
        Table selectTable = dataTable.select("id, ts, pt")
                .where("id = 'sensor_1'");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(selectTable, Row.class);
        rowDataStream.print("rowDataStream");
    }
}